ReentrantLock 与 AQS 源码分析
1. 在阅读源码时做了大量的注释,并且做了一些测试分析源码内的执行流程,由于博客篇幅有限,并且代码阅读起来没有 IDE 方便,所以在 github 上提供JDK1.8 的源码、详细的注释及测试用例。欢迎大家 star、fork !
2. 由于个人水平有限,对源码的分析理解可能存在偏差或不透彻的地方还请大家在评论区指出,谢谢!
1. 基本结构
重入锁 ReetrantLock,JDK 1.5新增的类,作用与synchronized关键字相当,但比synchronized更加灵活。ReetrantLock本身也是一种支持重进入的锁,即该锁可以支持一个线程对资源重复加锁,但是加锁多少次,就必须解锁多少次,这样才可以成功释放锁。
1. 继承
没有继承任何类,因为很多操作都使用了组合完成。
2. 实现
Lock, java.io.Serializable
这里着重介绍一下 Lock 接口,接口定义了几个必要的方法,也是在 ReentrantLock 中的重点需要分析的方法。
三类方法:获取锁、释放锁、获取条件。
1 | public interface Lock { |
从上面可以看到 Synchronized 和 Lock 的一些重要区别:
Lock 的获取锁的过程是可以中断的,Synchronized 不可以,Synchronized 只能在 wait或同步代码块执行过程中才可以被中断。
由于 Lock 显示的加锁,锁可以横跨几个方法,也就是临界区的位置可以更加自由。
Lock 支持超时获取锁。
后面会看到 Lock 还支持公平及非公平锁。
绑定多个 Condition 条件
3. 主要字段
很好,这个类的字段非常的少,真正起作用的字段只有一个 “锁” 字段。
1 | // 同步锁 |
这个锁(Sync)是一个继承自 AQS 的抽象内部类,说明一下 AQS (AbstractQueuedSynchronizer) 一般被称为队列同步器,他是并发包中的核心组件,绝大多数锁机制都是采用的这个类来实现的。虽然看到他是一个抽象类,但是你会发现里面没有一个方法是抽象方法,他实现了锁机制中的必要的通用的方法,待会会专门讲这个类。不然 ReentrantLock 没办法说,ReentrantLock 里面的锁操作都是依赖于 AQS。
然后这个锁是有两个子类,分别是 NonfairSync
和 FairSync
从名字上也可以看出这两个类分别代表了 公平锁
和 非公平锁
。何为锁的公平性? 实际上就是新来的线程需要征用锁必须要要等到先于他到达的线程获取并释放锁。也就是获取锁的过程是按照下来后到的顺序进行的,反之就称为非公平锁。后面我们会看到其实这两种锁不同就在于非公平锁在新线程创建后首先会直接进行锁的获取,如果没有获取到会进行一段时间的自旋,始终没获取到锁才进行等待状态。
一般而言,公平锁开销比非公平锁大,这也是比较符合我们的直观感受。公平锁是需要进行排队的,但在某些场景下,可能更注重时间先后顺序,那么公平锁自然是很好的选择。
好总结一下,在 ReentrantLock 中只维护了一个 “锁” 变量,这个锁是继承了 AQS 同步器,然后这个锁又有两种派生的锁:公平锁,非公平锁。那么 ReentrantLock 实现其实就有两种方式:公平锁,非公平锁。
4. 主要方法概览
- ctor-2
- lock
- lockInterruptibly
- tryLock
- tryLock(time)
- unlock
- newCondition
2. 基础并发组件 AQS
1. 基本字段
1. 重要字段
AQS 是维护了一个同步队列(双向链表),这个队列里面线程都是需要竞争锁的,没有竞争到的就在同步队列中等待。 head
和 tail
就指向队列的首尾。state
是一个标志字段,表示当前有多少线程在临界区。一般来说 state
只能是 0 或 1 但是由于锁是可重入的,所以也有大于 1 的情况。
除了一个同步队列还有 0~n 个等待队列,等待队列就是调用了 await
方法的线程,会被挂到调用了 await
的 condition
上面的等待队列,所以有多少个 condition
就有多少等待队列。
1 | //同步队列头指针 |
2. Node 节点
Node 节点也就是上文所提到的 同步队列
和 等待队列
中的元素,注意两个队列之间的元素类型是一样的因为他们之间会有相互移动转换的动作,这两个队列中的元素自然是线程,为了方便查找和表示 AQS 将线程封装到了 Node 节点中,构成双向队列。
1 | static final class Node { |
可以看到上面有一个 waitStatus
属性,代表了线程当前的状态,状态标识就是那些常量。具体如下:
SIGNAL: 正在执行的线程结束释放锁或者被取消执行,他必须唤醒后续的状态为 SIGNAL 节点
CANCELLED: 在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点, 其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
CONDITION: 该标识的结点处于等待队列中(不是同步队列),结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
PROPAGATE:在共享模式中,该状态标识结点的线程处于可运行状态。
0:代表初始化状态。
可以看到,Node 里面的主要字段就是一个状态标志位、一个线程的引用、用于构建链表的指针。注意,有三个指针,其中前两个 next
和 pre
是用来构建同步队列的(双向链表),后面 nextWaiter
是用来构建等待队列。所以说虽然同步队列和等待队列使用的同一个数据类型,数据结构是不同的,并且在后面我们会看到等待队列中的节点只有两种状态 Condition
和 CANCELLED
前者表示线程已结束需要从等待队列中移除,后者表示条件结点等待被唤醒。
下面画图说明一下同步队列和等待队列的情况。
等待队列
同步队列
3. ConditionObject
这个内部类是等待唤醒机制的核心,在他上面绑定了一个等待队列。在这个类中使用了两个指针( firstWaiter/lastWaiter
)指向队列的首尾。这里主要看一下 await
、signal
和 signalAll
方法。
- await
当一个线程调用了await()相关的方法,那么首先构建一个Node节点封装当前线程的相关信息加入到等待队列中进行等待,并释放锁直到被唤醒(移动到同步队列)、中断、超时才被队列中移出。被唤醒后的第一件事是抢锁和检查是否被中断,然后才是移除队列。被唤醒时候的状态应该为 SIGNAL ,而在方法中执行的移除队列的操作就是移除状态非 Condition 的节点。
1 | public final void await() throws InterruptedException { |
- signal/doSignal/signalAll
执行 signal 首先进行锁的判断,如果没有获取到独占锁就直接抛出异常。这也就是为什么只有拥有锁的线程才能执行 signal ,然后获取等待队列中的第一个节点执行 doSignal。
1 | public final void signal() { |
doSignal 方法主要就干了三个事 :
- 将被唤醒的节点从等待队列中移除(while 循环体),如果被唤醒的节点被取消了就继续唤醒后面的节点(transferForSignal 返回 false)
- 否则把这个节点加入到同步队列 ( enq 方法 )
- 当同步队列中当前节点的前驱被取消或者没办法唤醒时则唤醒这个线程 ( unpark ),这时候调用了 unpark 正好和 await 中的 park 相对应使得 await 的线程被唤醒,接着执行循环体判断自己已经被移入到同步队列了,接着就可以执行后面的获取锁的操作。
1 | private void doSignal(Node first) { |
有一个小问题,就是在某个线程中执行了别人的 signal 不会导致当前线程立即放弃锁,之所以会这样正是由于 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
这个判断,即前驱线程都结束了。比如下面的例子:
1 | package util.AQSTest; |
输出的结果始终是:
1 | runner 1 start |
我使用了工具对上面的代码进行了调试,大致说一下流程,顺便用来捋一捋等待唤醒机制。
首先 runner1 启动,获取到锁,打印出 “runner1 start” ,然后调用了 await 方法,此时 runner1 线程就执行了 AQS 中的 ConditionObject 中的 await 方法,该方法首先 new 了一个新的节点,把 runner1 封装到这个节点里面。挂在了 run1Con 的等待队列上,然后执行了释放锁并判断中断。紧接着 runner1 线程执行循环体判断是否被唤醒也就是是否在同步队列,显然这时候不在,就直接调用了 park 方法,执行休眠 1 秒钟操作, park 方法是 native 方法由操作系统实现。在上面线程释放锁的时候执行的操作是 fullyRelease
这个方法调用了 release
方法,而 release
方法中释放了锁之后,会检查同步队列中是否还有以前因为没抢到锁而等待的线程,如果有执行 unparkSuccessor
也就是唤醒同步队列中的后继线程。那么此时 runner2 会被唤醒,唤醒后就去抢锁,获取到 lock 锁后输出了 “runner2 start” ,然后 runner2 线程又会因为调用 await
处于和 runner1 同样的境地,也就是被放入 run2Con 的等待队列。好!此时 runner1 的超时时间到了,就会被 unpark 这个 unpark 是被操作系统调用的,之后继续执行循环体发现超时时间小于等于 0 ,则调用 transferAfterCancelledWait
里面调用了 enq
就是加入同步队列,接着开始竞争锁,开始执行 run2Con 上的 signal 此时 signal 调用 doSignal 先执行 do while 中的循环体,runner2 从 run2Con 的等待队列上移除,然后执行 transferForSignal
里面又调用了 enq
将他加入同步队列,并返回同步队列中的前驱,前驱节点状态不是 Cancelled 或者 可以被置为 SIGNAL 则 signal 方法结束。接着打印了 “runner1 exit” 。接着需要执行 finally 里面的释放锁的操作了,显然 unlock 肯定调用了 release ,而 release 会唤醒同步队列中的后继的线程,那么位于同步队列中的 runner2 之前的 park 状态就会被打断,从而跳出 while 循环,执行获取锁的操作。打印出 “runner2 exit” ,最后释放锁整个程序结束。
现在总算是吧 Condition 的等待唤醒机制弄清楚了。也把 AQS 中的两个内部类的功能都解释完了。接下来就看 AQS 中的方法。
2. 重要方法
- get/setState
- release/tryRelease/unparkSuccessor/fullyRelease
- acquire/tryAcquire/addWaiter/tryQueued
- acquireShared
- releaseShared
这些属于 AQS 中常用的方法,但是里面的核心方法都是模板方法,也就是说由继承他的子类来实现,所以只能看个大概的逻辑。一会等到讲 ReentrantLock 时再详细说这里面的方法。
3. ReentrantLock 内部类 Sync/fairSync/noFairSync
1. Sync
这三个内部类实际上是继承自 AQS ,也就是说 ReentrantLock 是采用了 AQS 作为自己的核心并发控制组件完成的一系列的锁操作,及等待唤醒机制。
首先看一下 Sync 他是后面两个的父类,他直接继承自 AQS 。AQS 中留了几个比较重要的模板方法 tryAcquire 、tryRelease 。这个方法直接实现了一些在公平锁和非公平锁中的通用操作,也就是释放锁的操作 tryRelease 。
tryRelease 的实现很简单,主要就是依赖于 AQS 中的 state 属性,如果state 值减去要释放的信号量为 0 则释放成功,否则失败。
1 | // 释放锁的公共操作 |
2. fairSync
   公平锁执行 lock 操作就是执行了 AQS 中的 acquire(1) 也就是请求一个锁资源。但是注意,在 AQS 中的 acquire 中的 tryAcquire 方法没有实现,所以必须由当前类实现。
   在 tryAcquire 中做的事情就是看是否有代码在临界区。没有则还要看同步队列中是否有线程等待,当只有这一个线程在获取锁的时候才能正常的获取锁,其他情况都失败。
1 | // 公平锁 |
3. noFairSync
同理,这个方法也需要实现 lock 和 tryAcquire 操作。在 lock 中直接判断是否有代码在临界区,没有则直接获取到锁,与公平锁不同的是:公平锁还判断了等待队列中是否有等待的线程。有在临界区的情况时执行 acquire 操作。同样的,首先要执行 tryAcquire 如果失败,加入同步队列并自旋获取锁。还是 tryAcquire 的实现,这里又调用了 nonfairTryAcquire。
1 | // 非公平锁 |
好了,现在我们 AQS 中的空的核心方法也被子类实现了,那么现在 fairSync 和 noFairSync 就算是一个完整的 AQS 了。此时看一下加解锁的流程。
只说公平锁,因为非公平锁就只是少了一个判断。
首先 sync 调用 lock 方法,让后 lock 调用了 AQS 的 acquire(1) 也就是获取一个锁资源。
acquire 就先调用 tryAcquire(1) 尝试获取锁,这时候代码又回调到 sync 中的实现的 tryAcquire 方法,这个方法先判断锁是否已经被别的线程使用,然后需要确定没有更早的线程在同步队列等待获取锁,才把当前线程设置为独占线程,并设置 state 值获取锁。但是如果有代码在临界区需要判断是否为当前线程,因为锁是可重入的。如果是当前线程则 state 加上请求锁的个数,返回。
这时候又回到 AQS 中,如果上面尝试获取锁的过程失败,就需要调用 addWaiter 将当前线程封装成一个独占节点,等待状态默认为 0,并且返回当前节点。
加入同步队列后,再调用 acquireQueued 方法,当此线程是同步队列中等待的第一个线程则自旋尝试获取锁,毕竟很可能正在执行的线程马上就会释放锁了,再进行休眠不合适。如果自旋获取锁失败则判断节点状态是否为 SIGNAL 然后执行等待操作。
锁获取成功则把当前节点设置为头结点,把 thread = null
至此,Acquire 方法执行结束。
然后调用 unlock 方法解锁操作。
解锁操作就没那么麻烦,首先还是调用到了 AQS 中的 release 方法,这个方法首先尝试解锁当前线程,又回调到了 sync 中的 tryRelease 。
tryRelease 逻辑比较简单,使用 AQS 中的 state 减去释放的资源数,等于 0 代表完全释放,否则释放失败。
如果 tryRelease 成功执行就要去唤醒同步队列中的后继节点,继续执行。
至此,release 方法执行完毕。
4. AQS 中的要方法
1. get/setState
这两个方法主要是对 state 变量的 volatile 的读写,其实里面就就是普通的 get/set 方法。但是注意的一点就是 state 是 volatile 的。
1 | // 对状态变量的 volatile 读写 |
2. release/tryRelease/unparkSuccessor/fullyRelease
这几个方法在一起说主要是因为他们之间存在调用链,首先来看 release 这个方法我们在上面也分析了,里面调用了 tryRelease 、unparkSuccessor。 也就是首先调用 tryRelease 来释放当前线程的锁,如果释放成功就调用 unparkSuccessor 来唤醒同步队列中后继节点。其中 tryRelease 是由子类来实现,里面的主要逻辑就是看当前的 state 变量的值在修改过后是否为0 。这里还有一个 fullRelease 主要是在 ConditionObject 中调用的,当执行 await 的操作的时会执行此方法释放锁。
1 | // 尝试释放锁 |
3. acquire/tryAcquire/addWaiter/acquireQueued
这个和上面的一样,在执行了 acquire 后,会去调用子类复写的 tryAcquire 方法,这个方法就是看有否有代码块在临界区,没有的话直接获取锁(非公平锁),设置 state,有的话要判断是不是当前线程能否进行重入操作,否则就获取失败。失败后会调用 addWaiter ,new 一个新的节点加入到同步队列,接着调用了 acquireQueued 如果这个节点是同步队列中的第一个等待的线程(但不是第一个节点,因为第一个节点是 thread=null 的运行中的线程)就自旋一段时间看能否获取到锁。不能则 park 等待。
1 | // 获取锁 |
5. 总结
其实到这里 ReentrantLock 已经讲完了,因为他底层全部调用的是 Sync 中的方法,也就是全都是调用了 AQS 中的方法。而 AQS 中的大部分重要的方法都已经看过了。